concurrent.futures を使ってみよう
Python3.2 から利用できる concurrent.futures は、非同期に実行できる呼び出し可能オブジェクトの高水準のインタフェースを提供します。
もっとわかりやすく説明すると、treading や multiporcessing ではプログラマがスレッドやプロセスを個々に処理する必要がありますが、concurrent.futures では複数のスレッドやプロセスを一括して処理してくれるため、並列処理プログラムの開発がとても楽になります。
concurrent.futuresモジュールには抽象クラス Executor があり、これを継承させた2つのExecutorクラスが提供されています。
タスクを並列実行するためにはこの2つのクラスのうちどちらかを使用します。
ThreadPoolExecutorクラス
スレッドを使って並列タスクを実行します。
ネットワークアクセスやファイル操作などCPU計算をあまり必要としない処理の並列実行に適しています。
ProcessPoolExecutorクラス
プロセスを使って並列タスクを実行します。
CPUに負荷がかかる計算処理などの並列実行に適しています。
これらのクラスのAPIは同じなので、アプリケーションは最小限の変更でスレッドとプロセスを切り替えることができます。
それぞれのクラスのコンストラクタに、max_workers 引数で実行可能なタスクの最大数を与えてインスタンスオブジェクトを生成することができます。デフォルトはプラットフォームのCPU数になります。
生成されるインスタンスオブジェクトには並列タスクを実行する以下のメソッドがあります。
submit()メソッド
1つのタスクを実行キューに追加します。
実行中のタスクがmax_workers未満であれば追加されたタスクは即実行が開始されます。
戻り値のFutureオブジェクトでタスクのキャンセルや実行結果を取得します。
map()メソッド
実行タスクをイテレータで渡します。
戻り値はタスクの実行結果を取得するためのジェネレータです。
shutdown()メソッド
現在ペンディングされているFuturesの実行が終了したときに、使用しているすべてのリソースを解放すべきであることをExecutorに通知します。シャットダウン後に行われたExecutor.submit() と Executor.map() の呼び出しは RuntimeError を発生させます。
code: python
n 2: # %load 01_shutdown.py ...: from concurrent.futures import ThreadPoolExecutor
...: import threading
...: import random
...: import time
...:
...: def task():
...: print("Executing our Task")
...: time.sleep(5)
...:
...: print('Main Task')
...: executor = ThreadPoolExecutor(max_workers=3)
...: executor.shutdown()
...:
...: try:
...: task1 = executor.submit(task)
...: except RuntimeError as e:
...: print(e)
...:
Main Task
cannot schedule new futures after shutdown
ThreadPoolExecutorクラスとProcessPoolExecutorクラス
`ThreadPoolExecutorは max_workers` 引数で与えたスレッドを非同期実行に使用するためプール(維持)します。
デフォルトでは min(32, os.cpu_count() + 4) スレッドになります。
code: python
In 2: # %load 02_threadpool_submit.py ...: from concurrent.futures import ThreadPoolExecutor
...: import threading
...: import random
...: import time
...:
...: def task():
...: print("Executing our Task")
...: time.sleep(5)
...: result = 0
...: i = 0
...: for i in range(10):
...: result = result + i
...: print(f"Task Executed {threading.current_thread()}")
...: return result
...:
...: executor = ThreadPoolExecutor(max_workers=3)
...: task1 = executor.submit(task)
...: task2 = executor.submit(task)
...: print('Main Task')
...: print(f'Task1: {task1.result()}')
...: print(f'Task2: {task2.result()}')
...:
Executing our Task
Executing our Task
Main Task
Task Executed <Thread(ThreadPoolExecutor-0_0, started 139946299385600)>
Task1: 45
Task Executed <Thread(ThreadPoolExecutor-0_1, started 139946290206464)>
Task2: 45
ThreadPoolExecutorによるExecutorオブジェクトから返され、submit()によりタスクを実行します。submit()はtタスクの完了をまたずに、タスクの未来の値を表すFuturesオブジェクトを返します。(task1と task2)
このため処理はメインタスクに戻るため Main Taskが出力されています。
Futuresオブジェクトのresult()メソッドを呼び出すことで、タスクの結果を取得することができます。
このとき、タスクがまだ完了していなければメインタスクはブロック(待たされる)されます。
submit()の代わりに map()メソッドでワーカーにタスクを割り当ててみましょう。
code: python
In 2: # %load 03_threadpool_map.py ...: from concurrent.futures import ThreadPoolExecutor
...: import threading
...: import time
...:
...: def task(n):
...: time.sleep(5)
...: return (n, threading.current_thread())
...:
...: executor = ThreadPoolExecutor(max_workers=2)
...: results = executor.map(task, range(5, 0, -1))
...:
...: for n, tid in results:
...: print(f'ran task {n} in thread {tid}')
...:
...:
ran task 5 in thread <Thread(ThreadPoolExecutor-0_0, started 139790836393728)>
ran task 4 in thread <Thread(ThreadPoolExecutor-0_1, started 139790826952448)>
ran task 3 in thread <Thread(ThreadPoolExecutor-0_0, started 139790836393728)>
ran task 2 in thread <Thread(ThreadPoolExecutor-0_1, started 139790826952448)>
ran task 1 in thread <Thread(ThreadPoolExecutor-0_0, started 139790836393728)>
ThreadPoolExecutorでプールされたワーカースレッドは複数のタスクのために再利用されていることがわかります。
ProcessPoolExecutorはThreadPoolExecutorと同じように機能しますが、スレッドの代わりにプロセスを使用します。
code: python
In 2: # %load 04_processpool_submit.py ...: from concurrent.futures import ProcessPoolExecutor
...: import time
...: import os
...:
...: def task():
...: print("Executing our Task")
...: time.sleep(5)
...: result = 0
...: i = 0
...: for i in range(10):
...: result = result + i
...: print(f"Task Executed PID:{os.getpid()}")
...: return result
...:
...: executor = ProcessPoolExecutor(max_workers=3)
...: task1 = executor.submit(task)
...: task2 = executor.submit(task)
...:
...: print(f'Main Task PID: {os.getpid()}')
...: print(f'Task1: {task1.result()}')
...: print(f'Task2: {task2.result()}')
...:
Main Task PID: 1198206
Executing our Task
Executing our Task
Task Executed PID:1198214
Task1: 45
Task Executed PID:1198217
Task2: 45
submit()の代わりに map()メソッドでワーカーにタスクを割り当ててみましょう。
code: python
In 2: # %load 05_processpool_map.py ...: from concurrent.futures import ProcessPoolExecutor
...: import os
...: import time
...:
...: def task(n):
...: time.sleep(5)
...: return (n, os.getpid())
...:
...: executor = ProcessPoolExecutor(max_workers=2)
...: results = executor.map(task, range(5, 0, -1))
...:
...: for n, pid in results:
...: print(f'ran task {n} in process {pid}')
...:
ran task 5 in process 1190386
ran task 4 in process 1190389
ran task 3 in process 1190386
ran task 2 in process 1190389
ran task 1 in process 1190386
ThreadPoolExecutor と同様に、個々のワーカープロセスは複数のタスクのために再利用されます。
ThreadPoolExecutorとProcessPoolExecutorは、ほどんど同じAPIで使用することができます。両者の違いは、ワーカーを生成する方法がスレッドなのかプロセスなのかということです。
処理の目的に応じて使い分けることになります。
ThreadPoolExecutor:I/Oが多い処理に向いている
ProcessPoolExectuor:GILを回避したいときやCPU計算が多い処理に向いている、
ThreadPoolExecutorの適用事例:
次のコードは、URLで与えたページのコンテンツを返すタスクをスレッドで処理して並列実行しています。
code: python
In 2: # %load 06_threadpool_case_study.py ...: import concurrent.futures
...: import urllib.request
...:
...:
...: def load_url(url, timeout):
...: with urllib.request.urlopen(url, timeout=timeout) as conn:
...: return conn.read()
...:
...: with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
...: future_to_url = {executor.submit(load_url, url, 60): url for url in
...: URLS}
...: for future in concurrent.futures.as_completed(future_to_url):
...: url = future_to_urlfuture ...: try:
...: data = future.result()
...: except Exception as exc:
...: print('%r generated an exception: %s' % (url, exc))
...: else:
...: print('%r page is %d bytes' % (url, len(data)))
...:
ProcessPoolExectuorの適用事例:
与えた数値が素数かどうかを計算して調べるタスクをプロセスを使って生成したワーカーに処理させています。
code: python
In 2: # %load 07_processpool_case_study.py ...: import concurrent.futures
...: import math
...:
...: PRIMES = [
...: 112272535095293,
...: 112582705942171,
...: 112272535095293,
...: 115280095190773,
...: 115797848077099,
...: 1099726899285419]
...:
...: def is_prime(n):
...: if n < 2:
...: return False
...: if n == 2:
...: return True
...: if n % 2 == 0:
...: return False
...:
...: sqrt_n = int(math.floor(math.sqrt(n)))
...: for i in range(3, sqrt_n + 1, 2):
...: if n % i == 0:
...: return False
...: return True
...:
...: with concurrent.futures.ProcessPoolExecutor() as executor:
...: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
...: print('%d is prime: %s' % (number, prime))
...:
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
BrokenThreadPool と BrokenProcessPool
Executor が何らかの理由で壊れ、新しいタスクの投入や実行ができなくなったときに次の例外が発生します。
BrokenThreadPool:
ThreadPoolExecutorがワーカの初期化に失敗したときに発行される(Python 3.7〜)
BrokenProcessPool:
ProcessPoolExecutorのワーカの1つが正常に終了されなかったときに発行される(Python 3.3〜)
ワーカープロセスの1つに何かが発生して予期せず終了した場合、ProcessPoolExecutorはワーカーが「壊れている」と判断して、タスクをスケジュールしなくなります。
code: python
In 2: # %load 10_processpool_broken.py ...: import os
...: import signal
...: from concurrent.futures import ProcessPoolExecutor, process
...:
...: with ProcessPoolExecutor(max_workers=2) as executor:
...: print('getting the pid for one worker')
...: future1 = executor.submit(os.getpid)
...: pid1 = future1.result()
...:
...: print(f'killing process {pid1}')
...: os.kill(pid1, signal.SIGHUP)
...:
...: print('submitting another task')
...: future2 = executor.submit(os.getpid)
...: try:
...: pid2 = future2.result()
...: except process.BrokenProcessPool as errmsg:
...: print(f'could not start new tasks: {errmsg}')
...:
getting the pid for one worker
killing process 1190620
submitting another task
could not start new tasks: A process in the process pool was terminated abruptly while the future was running or pending.
BrokenProcessPool例外は、新しいタスクが送信されたときではなく、結果が処理されたときに実際に発行されることに注意してください。
ThreadPoolExecutor のthreading との比較
"スレッドベースの並列処理"でI/Oバウンドでの例で示したdownload_stock_thread.py は次のようなものでした。 code: stock_downloader.py
from loguru import logger
from datetime import datetime
import pandas as pd
import pandas_datareader as pdr
def stock_downloader(symbol, start=None, end=None):
if start is None:
start=datetime(2010, 1, 1)
if end is None:
end=datetime(2020, 12, 31)
df = pdr.DataReader(symbol, 'yahoo', start, end)
df.to_csv(f'{symbol}.csv')
logger.info(f'Downloaded {symbol}')
return df
code: python
In 2: # %load 20_stock_downloader_thread.py ...: from loguru import logger
...: from queue import Queue
...: from threading import Thread
...: from stock_downloader import stock_downloader
...:
...:
...: logger.remove()
...: logger.add('/tmp/stock_downloader.log')
...:
...: class DownloadWorker(Thread):
...: def __init__(self, queue):
...: Thread.__init__(self)
...: self.queue = queue
...:
...: def run(self):
...: while True:
...: symbol = self.queue.get()
...: try:
...: stock_downloader(symbol)
...: finally:
...: self.queue.task_done()
...:
...: # ワーカースレッドと通信するためのキューを作成
...: queue = Queue()
...:
...: # ワーカースレッドを作成 4thread
...: for x in range(4):
...: worker = DownloadWorker(queue)
...: # デーモンモードではワーカーがブロックしている場合でも
...: # メインスレッドが終了する
...: worker.daemon = True
...: worker.start()
...:
...: # ダウンロードする株価コードをキューに入れる
...: for symbol in stock_symbols:
...: queue.put(symbol)
...:
...: # キューがすべてのタスクの処理を終了するのをメインスレッドが待機
...: queue.join()
...:
これを、ThreadPoolExecutorクラスを使って実装すると、次のようにスッキリと記述することができます。
code: python
In 2: # %load 21_stock_downloader_threadpool.py ...: from loguru import logger
...: from concurrent.futures import ThreadPoolExecutor
...: from stock_downloader import stock_downloader
...:
...:
...: logger.remove()
...: logger.add('/tmp/stock_downloader.log')
...:
...: with ThreadPoolExecutor() as executor:
...: executor.map(stock_downloader, stock_symbols, timeout=30)
...:
ThreadPoolExecutor のmultiprocessing との比較
code: python
In 2: # %load 30_pi_multi.py ...: import multiprocessing as mp
...:
...: def calc_partial_pi(rank, nprocs, nsteps, dx):
...: partial_pi = 0.0
...: for i in range(rank, nsteps, nprocs):
...: x = (i + 0.5) * dx
...: partial_pi += 4.0 / (1.0 + x * x)
...: partial_pi *= dx
...: return partial_pi
...:
...: nsteps = 10000000
...: dx = 1.0 / nsteps
...:
...: nprocs = mp.cpu_count()
...:
...: pool = mp.Pool(processes=nprocs)
...: result = pool.starmap(calc_partial_pi, inputs)
...: pi = sum(result)
...: print(pi)
...:
3.141592653589731
これは、multiprocessing で実装したものです。
このコードは重要な箇所はpool.starmap()の呼び出しと、その 入力となる inputsの生成です。
これを ThreadPoolExecutorで実装すると次のようになります。
code: python
In 2: # %load 31_pi_concurrent_threadpool.py ...: from concurrent.futures import ThreadPoolExecutor
...: from functools import partial
...:
...: def calc_partial_pi(rank, nprocs, nsteps, dx):
...: partial_pi = 0.0
...: for i in range(rank, nsteps, nprocs):
...: x = (i + 0.5) * dx
...: partial_pi += 4.0 / (1.0 + x * x)
...: partial_pi *= dx
...: return partial_pi
...:
...: nsteps = 10000000
...: dx = 1.0 / nsteps
...: nprocs = 4
...:
...: calc = partial(calc_partial_pi, nprocs=nprocs, nsteps=nsteps, dx=dx)
...: with ThreadPoolExecutor(max_workers=nprocs) as executor:
...: results = executor.map(calc, range(nprocs))
...:
...: print(pi)
...:
3.141592653589686
multiprocesss での pool.starmap() に相当するものが executor.map()です。
executor.map() は呼び出し可能オブジェクトとイテラブルオブジェクトを受け取りますが、calc_partial_pi()に与えるべき引数をすべてmap()に与えることができないことに注意してください。
そのため、functools.partial()を使っています。
code: python
calc = partial(calc_partial_pi, nprocs=nprocs, nsteps=nsteps, dx=dx)
これにより、calc() は次の呼び出しと同じことになります。
code: python
calc_partial_pi(nprocs=nprocs, nsteps=nsteps, dx=dx)
ProcessPoolExecutor のmultiprocessing との比較
30_pi_multi.py を、今度はProcessPoolExecutorクラスを使って実装してみましょう。
code: python
n 2: # %load 30_pi_concurrent_processpool.py ...: from concurrent.futures import ProcessPoolExecutor
...: from functools import partial
...:
...: def calc_partial_pi(rank, nprocs, nsteps, dx):
...: partial_pi = 0.0
...: for i in range(rank, nsteps, nprocs):
...: x = (i + 0.5) * dx
...: partial_pi += 4.0 / (1.0 + x * x)
...: partial_pi *= dx
...: return partial_pi
...:
...: nsteps = 10000000
...: dx = 1.0 / nsteps
...: nprocs = 4
...:
...: calc = partial(calc_partial_pi, nprocs=nprocs, nsteps=nsteps, dx=dx)
...: with ProcessPoolExecutor(max_workers=4) as executor:
...: results = executor.map(calc, range(nprocs))
...:
...: print(pi)
...:
3.141592653589686
ThreadPoolExectutorの場合と、ほとんど同じように記述できていることがわかります。
Futureオブジェクト
Futureの言葉には「未来」の意味があります。Future オブジェクトはこの「未来」の概念に基づいています。通常のPython関数が呼び出されると、Pythonインタープリタは関数の実行が完了するのを待ち、結果を返します。長時間実行される関数の場合、完了を待つことは望ましくないかもしれません。そうしたときは、関数は非同期に実行される方がよいはずです。
Executorオブジェクトのsubmit()でタスクが実行すると、そのタスクが計算する「未来の値」を表す Futuresオブジェクトがすぐに返されます。タスクが別のワーカーー処理されている間、呼び出したメイン側は他の作業を行うことができます。 メイン側は、後でタスクが完了するのを待って、結果を取得することができます。Future オブジェクトは基本的に並列タスクのステータスを追跡するためのオブジェクトで、ステータスや結果を調べることができるようになっています。
Futureオブジェクトには次のメソッドがあります。
cancel()
呼び出しのキャンセルを試みます。呼び出しが現在実行中または終了していてキャンセルできない場合、このメソッドはFalseを返し、そうでない場合は呼び出しがキャンセルされ、このメソッドはTrueを返します。
cancelled()
呼び出しが正常にキャンセルされた場合 True を返します。
running()
現在呼び出しが実行中でキャンセルできない場合 True を返します。
done()
呼び出しが正常にキャンセルされたか終了した場合 True を返します。
result(timeout=None)
呼び出しによって返された値を返します。呼び出しがまだ完了していない場合、このメソッドは timeout 秒の間待機します。呼び出しが timeout 秒間の間に完了しない場合、 concurrent.futures.TimeoutError が送出されます。 timeout にはintかfloatを指定できます。timeout が指定されていないか、 None である場合、待機時間に制限はありません。
Future が完了する前にキャンセルされた場合 CancelledError が送出されます。
呼び出しが例外を送出した場合、このメソッドは同じ例外を送出します。
exception(timeout=None)
呼び出しによって送出された例外を返します。呼び出しがまだ完了していない場合、このメソッドは timeout 秒だけ待機します。呼び出しが timeout 秒の間に完了しない場合、 concurrent.futures.TimeoutError が送出されます。 timeout にはintかfloatを指定できます。 timeout が指定されていないか、 None である場合、待機時間に制限はありません。
Future が完了する前にキャンセルされた場合 CancelledError が送出されます。
呼び出しが例外を送出することなく完了した場合、None を返します。
add_done_callback(fn)
呼び出し可能な fn オブジェクトを Future にアタッチします。Futureがキャンセルされたか、実行を終了した際に、Future をそのただ一つの引数として fn が呼び出されます。
追加された呼び出し可能オブジェクトは、追加された順番で呼びだされ、追加を行ったプロセスに属するスレッド中で呼び出されます。もし呼び出し可能オブジェクトが Exception のサブクラスを送出した場合、それはログに記録され無視されます。呼び出し可能オブジェクトが BaseException のサブクラスを送出した場合の動作は未定義です。
もしFutureがすでに完了しているか、キャンセル済みであれば、fn は即座に実行されます。
これ以降のサンプルコードでは、ThreadPoolExecutorを使用していますが、ProcessPoolExecutorでも同じです。
個々のタスクのスケジューリング
submit()メソッドを使用して個々のタスクをスケジュールし、返されるFutureオブジェクトを使用してそのタスクの結果を待つことができます。
code: python
In 2: # %load 40_submit.py ...: from concurrent.futures import ThreadPoolExecutor
...: import threading
...: import time
...:
...:
...: def dummy_task(n):
...: print(f'{threading.current_thread().name}: sleeping {n} sec.')
...: time.sleep(n)
...: print(f'{threading.current_thread().name}: down with {n} sec.')
...: return n
...:
...:
...: print('main: starting')
...: executor = ThreadPoolExecutor(max_workers=2)
...: future = executor.submit(dummy_task, 5)
...:
...: print(f'main: task is done: {future.done()}')
...:
...: print('main: waiting for results')
...: result = future.result()
...: print(f'main: result: {result}')
...:
...: print(f'main: task is done: {future.done()}')
...:
main: starting
ThreadPoolExecutor-0_0: sleeping 5 sec.
main: task is done: False
main: waiting for results
ThreadPoolExecutor-0_0: down with 5 sec.
main: result: 5
main: task is done: True
任意の順序でタスクを待機
Futureオブジェクトのresult()メソッドを呼び出すと、タスクが完了するまで(値を返すか、例外の発生)まで、またはキャンセルされるまでブロックされます。 複数のタスクの結果には、map()メソッドを使用してタスクがスケジュールされた順序でアクセスできます。 結果を処理する順序が重要でない場合は、as_completed()を使用して、各タスクの終了時に結果を処理します。
code: python
In 2: # %load 41_as_complete.py ...: from concurrent.futures import ThreadPoolExecutor, as_completed
...: import random
...: import time
...:
...: def dummy_task(n):
...: time.sleep(random.random())
...: return (n, n/10)
...:
...:
...: print('main: starting')
...: executor = ThreadPoolExecutor(max_workers=4)
...:
...: wait_for = [
...: executor.submit(dummy_task, i)
...: for i in range(5, 0, -1)
...: ]
...:
...: for future in as_completed(wait_for):
...: print(f'main: result: {future.result()}')
...:
...:
main: starting
main: result: (2, 0.2)
main: result: (5, 0.5)
main: result: (4, 0.4)
main: result: (3, 0.3)
main: result: (1, 0.1)
コールバック
タスクが完了したときに、結果を明示的に待たずに何らかのアクションを実行するには、add_done_callback()を使用して、Futureが完了したときに呼び出す新しい関数を指定します。 コールバックは、Futureインスタンスという単一の引数をとる呼び出し可能オブジェクトである必要があります。
code: python
In 2: # %load 42_callback.py ...: from concurrent.futures import ThreadPoolExecutor
...: import threading
...: import time
...:
...: def dummy_task(n):
...: print(f'{threading.current_thread().name}: sleeping {n} sec.')
...: time.sleep(n)
...: print(f'{threading.current_thread().name}: wake up.')
...: return n / 10
...:
...: def check_done(callback):
...: if callback.cancelled():
...: print(f'callback: {callbak.arg}: canceled')
...: elif callback.done():
...: error = callback.exception()
...: if error:
...: print(f'callback: {callback.arg}: error returned: {error}')
...: else:
...: result = callback.result()
...: print(f'callback: {callback.arg}: value returned: {result}')
...:
...:
...: if __name__ == '__main__':
...: print('main: starting')
...: executor = ThreadPoolExecutor(max_workers=2)
...:
...: future = executor.submit(dummy_task, 5)
...: future.arg = 5
...: future.add_done_callback(check_done)
...:
...: result = future.result()
...: print(f'main: task result: {result}')
...:
...:
main: starting
ThreadPoolExecutor-0_0: sleeping 5 sec.
ThreadPoolExecutor-0_0: wake up.
callback: 5: value returned: 0.5
main: task result: 0.5
コールバックは、Futureが「完了」と見なされる理由に関係なく呼び出されるため、コールバックを使用する前に、コールバックに渡されたオブジェクトのステータスを確認する必要があります。
タスクのキャンセル
Futureは、送信されたが開始されていない場合、cancel()メソッドを呼び出すことでキャンセルできます。
code: python
In 2: # %load 43_callback_cancel.py ...: from concurrent.futures import ThreadPoolExecutor
...: import threading
...: import time
...:
...: def dummy_task(n):
...: print(f'{threading.current_thread().name}: sleeping {n} sec.')
...: time.sleep(n)
...: print(f'{threading.current_thread().name}: wake up.')
...: return n / 10
...:
...: def check_done(callback):
...: if callback.cancelled():
...: print(f'Callback: Task {callback.arg}: canceled')
...: elif callback.done():
...: print(f'Callback: Task {callback.arg}: not canceled')
...:
...: if __name__ == '__main__':
...: executor = ThreadPoolExecutor(max_workers=2)
...: print('main: starting')
...: tasks = []
...:
...: for id in range(5, 0, -1):
...: print(f'main: submitting Task {id}')
...: future = executor.submit(dummy_task, id)
...: future.arg = id
...: future.add_done_callback(check_done)
...: tasks.append((id, future))
...:
...: for id, task in reversed(tasks):
...: if not task.cancel():
...: print(f'main: did not cancel Task {id}')
...:
...: executor.shutdown()
...:
main: starting
main: submitting Task 5
ThreadPoolExecutor-0_0: sleeping 5 sec.
main: submitting Task 4
ThreadPoolExecutor-0_1: sleeping 4 sec.
main: submitting Task 3
main: submitting Task 2
main: submitting Task 1
Callback: Task 1: canceled
Callback: Task 2: canceled
Callback: Task 3: canceled
main: did not cancel Task 4
main: did not cancel Task 5
ThreadPoolExecutor-0_1: wake up.
Callback: Task 4: not canceled
ThreadPoolExecutor-0_0: wake up.
Callback: Task 5: not canceled
Futureオブジェクトのcancel()メソッドは、タスクをキャンセルされたかを示すブール値を返します。
タスクの例外
タスクが未処理の例外を発生させた場合、そのタスクはFutureに保存され、result()メソッドまたはexception()メソッドを介して参照できるようになります。
code: python
In 2: # %load 44_task_exception.py ...: from concurrent.futures import ThreadPoolExecutor
...:
...: def dummy_task(n):
...: print(f'Task: starting')
...: raise ValueError(f'the value {n} is no good')
...:
...:
...: print('main: starting')
...: executor = ThreadPoolExecutor(max_workers=2)
...: future = executor.submit(dummy_task, 5)
...:
...: error = future.exception()
...: print(f'main: error: {error}')
...:
...: try:
...: result = future.result()
...: except ValueError as errmsg:
...: print(f'main: catch ValueError: "{errmsg}"')
...:
main: starting
Task: starting
main: error: the value 5 is no good
main: catch ValueError: "the value 5 is no good"
タスク関数内で未処理の例外が発生した後にresult()が呼び出されると、現在のコンテキストで同じ例外が再発生します。
コンテキストマネージャー
Executorsインスタンスはコンテキストマネージャとして機能し、タスクを同時に実行し、すべてが完了するのを待ちます。 コンテキストマネージャが終了すると、Executorsのshutdown()メソッドが呼び出されます。
code: python
In 2: # %load 45_context_manager.py ...: from concurrent.futures import ThreadPoolExecutor
...:
...: def dummy_task(n):
...: print(n)
...:
...: with ThreadPoolExecutor(max_workers=2) as executor:
...: print('main: starting')
...: executor.submit(dummy_task, 1)
...: executor.submit(dummy_task, 2)
...: executor.submit(dummy_task, 3)
...: executor.submit(dummy_task, 4)
...:
...: print('main: done')
...:
main: starting
1
2
3
4
main: done
コンテキストマネージャでタスクを起動するこのモードは、実行が現在のスコープを離れるときにスレッドまたはプロセスリソースをクリーンアップする必要がある場合に便利です。
まとめ
concurrent.futures はモジュール1つをインポートすることで、スレッドベースとプロセスベースのどちらの並列タスクの実行も行うことができます。また、処理の記述についてもよく似ているため、学習コストが低く目的解決が速くなります。
ただし、ThreadPoolExectutorクラスではスレッドベースでの並列タスクの実行になり、これはつまりPython の GIL に束縛されているままであることには注意が必要です。とわいえ、I/Oバウンドなタスクではスレッドベースでの並列処理も有用であることに変わりわありません。繰り返すことになりますが、スレッドベースでの並列処理では、それぞれのスレッドはメモリ空間を共有できることを忘れてはいけません。プロセス起動時やプロセス間通信でのオーバーヘッドといった問題は意識する必要がありません。
参考